#ifndef __TASK_RECOVERY_H__
#define __TASK_RECOVERY_H__

#include <semaphore.h>
#include "token_bucket.h"
#include "chunk.h"
#include "ec.h"
#include "timepoint.h"

// config dir: /dev/shm/lich4/nodectl

// OUT
#define RECOVERY             "recovery/"
#define RECOVERY_QOS_MODE    "recovery/qos_mode"

#define RECOVERY_IMMEDIATELY "/immediately"
#define RECOVERY_INTERVAL    "/interval"
#define RECOVERY_THREAD      "/thread"
#define RECOVERY_FILL_RATE   "/fill_rate"
#define RECOVERY_INFO        "/info"
#define RECOVERY_DISK_INFO   "/disk_info"
#define RECOVERY_RECOVERY_TOTAL        "/recovery_total"
#define RECOVERY_DISK_RECOVERY_TOTAL   "/disk_recovery_total"

#define RECOVERY_NODE_ONLINE "/recovery_node_online"

#define RECOVERY_THREAD_NUMBER          10
#define RECOVERY_THREAD_MAX             30
#define RECOVERY_INTERVAL_DEFAULT_VAL   28800  // 8 hours
#define RECOVERY_FILL_RATE_DEFAULT_VAL  300
#define RECOVERY_NODE_CHANGE_FLAG       30

#define RECOVERY_STP_THREAD_NUM   5

#define RECOVERY_SUBVOL_GROUP_SIZE (10 * FILE_PROTO_EXTERN_ITEM_COUNT)
#define RECOVERY_RMQ_MAX_RECORD    100000

#define RECOVERY_RMQ_MAX_FD_COUNT 1000
#define RECOVERY_RMQ_POP_NUMBER   1000


#define RECOVERY_GLOBAL_FLAG 1
#define VOL_REMOVE_FLAG      1
#define RECOVERY_IMMEDIATELY_ASYN 1


typedef enum {
        __RECOVERY_NODE_OFFLINE_TYPE__,
        __RECOVERY_DISK_OFFLINE_TYPE__,
        __RECOVERY_POOL_REMOVE_TYPE__,
        __RECOVERY_CLUSTER_DISK_OFFLINE_TYPE__,
        __RECOVERY_IMMEDIATELY_TYPE__,
} recovery_types_t;

typedef enum {
        __RECOVERY_WAITING__,
        __RECOVERY_SCANNING__,
        __RECOVERY_RUNNING__,
        __RECOVERY_REMOVEING__,

        __RECOVERY_IMMEDIATELY__,
        __RECOVERY_NOIMMEDIATELY__,
} recovery_status_t;

typedef enum {
        __R_TH_INIT__ = 0,
        __R_TH_RUN__,
        __R_TH_STOP__,
        __R_TH_STOPPED__
} thread_status_t;

static inline const char *_th_status2str(const int status)
{
        const char *value = NULL;

        if (status == __R_TH_INIT__)
                value = "init";
        else if (status == __R_TH_RUN__)
                value = "running";
        else if (status == __R_TH_STOPPED__)
                value = "stopped";
        else if (status == __R_TH_STOP__)
                value = "stop";
        else
                value = "unknown";

        return value;
}

#pragma pack(4)

typedef struct {
        fileid_t parent;              ///< 与db记录一致
        chkid_t chkid;
        char chkinfo_buf[CHKINFO_MAX];
        int repnum;                  ///< target repnum
} rec_t;

#pragma pack()

typedef vec_t(chkid_t) vec_chkid_t;
typedef vec_t(rec_t) vec_rec_t;

typedef struct {
        int fill_rate;
        int thread;
        // int interval;
} recovery_config;

typedef struct {
        void *pool;
        int idx;
        int status;
} recovery_seg_t;

typedef struct {
        sy_rwlock_t lock;
        int refcount;
        int status;
        timerange1_t range;

        char pool_name[MAX_NAME_LEN];
        time_t lastscan;

        uint64_t recovery;
        uint64_t offline;
        uint64_t lost;

        uint64_t success_total;
        uint64_t success;
        uint64_t fail;
        uint64_t speed;
} recovery_out_t;

enum {
        __T_RESULT_CHECK__ = 0,
        __T_RESULT_SUCCESS__,
        __T_RESULT_FAIL,
};

typedef struct __recovery_tp_t {
        sy_rwlock_t rwlock;
        char name[MAX_NAME_LEN];

        int started;
        int thread_num;
        recovery_seg_t segs[RECOVERY_THREAD_MAX];

        uint64_t check;
        uint64_t success;
        uint64_t fail;
        uint64_t speed;

        uint64_t success_total;

        int (*start)(struct __recovery_tp_t *self, void *pool, int num, thread_proc proc);
        int (*stop)(struct __recovery_tp_t *self);
        int (*is_stopped)(struct __recovery_tp_t *self, int *count);

        int (*inc)(struct __recovery_tp_t *self);
        int (*dec)(struct __recovery_tp_t *self);

        int (*result)(struct __recovery_tp_t *self, int t, int n);
} recovery_tp_t;

typedef struct {
        chkid_t parent;
        int repnum_usr;         // target repnum
        int repnum_sys;         // target repnum

        ec_t ec;
        int repnum_eclog;         // target repnum
} addtion_t;

// vol 结构体数据
typedef struct __recovery_mq_item_t {
        struct list_head hook;

        void *pool;
        chkid_t chkid;
        addtion_t addtion;

        int scan_status;

        // current mq file
        // when full or end, add to pool file_list
        char filename[MAX_NAME_LEN];
        int fd;
        void *addr;
        int cursor;

        int record_count;
        int record_max;

        int (*write)(struct __recovery_mq_item_t *self, void *buf, int len);
        int (*flush)(struct __recovery_mq_item_t *self);

        int (*copy)(struct __recovery_mq_item_t *self, struct __recovery_mq_item_t **new);
        int (*reset)(struct __recovery_mq_item_t *self);

        void (*dump)(struct __recovery_mq_item_t *self, const char *name);
        int (*destroy)(struct __recovery_mq_item_t *self);
} recovery_mq_item_t;

typedef struct __scan_mq_t {
        sy_rwlock_t rwlock;

        // scan meta

        // list of controller
        count_list_t chkid_list;

        // map chkid/controller to recovery_mq_item_t
        hashtable_t chkid_tab;

        // scan controller result
        uint64_t lost;
        uint64_t offline;
        uint64_t need_recovery;

        int (*push)(struct __scan_mq_t *self, void *pool, const chkid_t *chkid, const addtion_t *addtion);
        int (*pop)(struct __scan_mq_t *self, chkid_t *chkid);

        int (*get_addtion)(struct __scan_mq_t *self, const chkid_t *chkid, addtion_t *addtion);
        int (*get_length)(struct __scan_mq_t *self);
        int (*is_ready)(struct __scan_mq_t *self);
        int (*check_deleted_volume)(struct __scan_mq_t *self);
        int (*check_change_volume)(struct __scan_mq_t *self);

        int (*write)(struct __scan_mq_t *self, const chkid_t *chkid, void *buf, int len);
        int (*done)(struct __scan_mq_t *self, const chkid_t *chkid, int ret);

        int (*deinit)(struct __scan_mq_t *self);
        int (*destroy)(struct __scan_mq_t *self);

        int (*vol_iterator)(struct __scan_mq_t *self, int (*func)(void *, void *), void *ctx);
} scan_mq_t;

typedef struct __recovery_mq_t {
        sy_rwlock_t rwlock;

        // list of recovery_mq_item_t, which has a tmp file
        count_list_t file_list;

        // for load tmp file
        recovery_mq_item_t *mq_item;

        // from scan stage
        // uint64_t total;
        uint64_t recovery;

        int (*push)(struct __recovery_mq_t *self, recovery_mq_item_t *vol);
        int (*pop)(struct __recovery_mq_t *self, recovery_mq_item_t **vol);
        int (*pop2)(struct __recovery_mq_t *self, int len, vec_rec_t *v);

        int (*get_length)(struct __recovery_mq_t *self);

        int (*deinit)(struct __recovery_mq_t *self);
} recovery_mq_t;

// pool 结构体数据
typedef struct {
        char name[MAX_NAME_LEN];

        // SEDA stage: scan
        scan_mq_t smq;
        recovery_tp_t stp;   ///< scan thread pool

        // SEDA stage: recovery
        recovery_mq_t rmq;
        recovery_tp_t rtp;   ///< recover thread pool

        sem_t sem;
        time_t lastscan;

        int status;
        int status_remove;
        // int status_immediately;
        int need_rescan;

        int init_flag;
        int node_number;

        recovery_config config;
        token_bucket_t token_bucket;

        // private
        int vfm_vol_count;
} recovery_pool_t;


typedef enum {
        PREFER_APP      = 0,
        PREFER_RECOVERY = 1
} recovery_mode_t;

extern int is_recovery_started;

int recovery_init();
int recovery_is_running();

// from recovery_flag.c
int recovery_global_flag_init();

int recovery_global_flag_get();
int recovery_global_flag_set();

int recovery_global_flag_wait();
int recovery_global_flag_post();

// from recovery_config.c
int recovery_config_init();

int recovery_load_interval();

void recovery_config_dump(recovery_pool_t *_ent);

int common_load_fill_rate(const char *pool, token_bucket_t *tb, int is_disk);
int load_fill_rate(recovery_pool_t *_ent);

int common_load_thread(const char *pool, int is_disk);
int load_thread(recovery_pool_t *_ent);

int recovery_immediately(recovery_pool_t *_ent);
int recovery_immediately_unregister(const char *pool);
int recovery_delete_all_pool_file(recovery_pool_t *_ent);

int recovery_wakeup_one_pool(void *pool_name, void *_arg);
int recovery_wakeup_all_pool(int type);

void recovery_dump(recovery_pool_t *_ent, int new_status);

int recovery_node_get(recovery_pool_t *_ent);
void recovery_node_offline_set(recovery_pool_t *_ent);
// int recovery_node_online_all_pool();

int scan_mq_init(scan_mq_t *smq, const char *name);

int recovery_mq_init(recovery_mq_t *mq);

int recovery_mq_item_create(recovery_mq_item_t **item, recovery_pool_t *pool,
                            const chkid_t *chkid, const addtion_t *addtion);

int recovery_tp_init(recovery_tp_t *tp, const char *name);

// qos

int set_recovery_total(char *pool_name, uint64_t recovery_total, int is_disk);
uint64_t get_recovery_total(char *pool_name, int is_disk);

// int set_disk_recovery_total(char *pool_name, uint64_t recovery_total);
// uint64_t get_disk_recovery_total(char *pool_name);

int recovery_qos_set_mode(int mode);

int recovery_qos_apply(const char *name);

// recovery out

int recovery_out_init();

int recovery_out_add_recovery(const char *pool, int inc);
int recovery_out_add_lost(const char *pool, int inc);

int recovery_out_add_success(const char *pool, int inc);
int recovery_out_add_fail(const char *pool, int inc);

int recovery_out_set_status(const char *pool, int status);
int recovery_out_flush(const char *pool);

int recovery_out_get_speed(const char *pool, uint64_t *speed);


#endif
